/*
Copyright (c) 2023 China Mobile Information Technology Co., Ltd
OpenGauss Operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
         http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package service

import (
	"fmt"
	v1 "k8s.io/api/batch/v1"
	coreV1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/resource"
	metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/klog/v2"
	gsv1 "openGauss-operator/api/v1"
	customClient "openGauss-operator/internal/client"
	"openGauss-operator/internal/util"
	"reflect"
	"strconv"
	"strings"
)

type BackupRecoveryInterface interface {
	ReconcileBackupRecovery(gs *gsv1.OpenGaussCluster) error
	ReconcileBrBackupRecovery(br *gsv1.OpenGaussBackupRecovery) error
}

type BackupRecovery struct {
	K8sClient customClient.K8sClient
}

var _ BackupRecoveryInterface = &BackupRecovery{}

func NewBackupRecovery(client customClient.K8sClient) BackupRecoveryInterface {
	return &BackupRecovery{K8sClient: client}
}

func (r *BackupRecovery) ReconcileBrBackupRecovery(br *gsv1.OpenGaussBackupRecovery) error {

	// 开启|关闭 定时备份
	if err := r.ReconcileBrCronBackup(br); err != nil {
		return err
	}

	// 开启|关闭 手动备份
	if err := r.reconcileBrManualBackup(br); err != nil {
		return err
	}

	// 开启|关闭 手动恢复
	if err := r.reconcileBrManualRecovery(br); err != nil {
		return err
	}

	// 回显状态到ogc
	if err := r.EchoOgcStatus(br); err != nil {
		return err
	}

	return nil
}

func (r *BackupRecovery) EchoOgcStatus(br *gsv1.OpenGaussBackupRecovery) error {

	nowGs, err := r.K8sClient.GetOpenGaussCluster(util.GetNamespacedName(br.Namespace, br.Name))
	if err != nil {
		return util.ReturnError("Failed to get ogc", err)
	}
	newModifyGs := nowGs.DeepCopy()

	// 备份回显
	brBackups := br.Spec.Backups
	ogcBackups := newModifyGs.Spec.BackRecovery.Backups
	for _, backup := range brBackups {
		brPodName := backup.PodName
		brStatus := backup.Status
		if brPodName == "" || brStatus == util.BackupStageCreate {
			continue
		}
		for i, ogcBackup := range ogcBackups {
			if ogcBackup.PodName == brPodName && ogcBackups[i].Status != brStatus {
				ogcBackups[i].Status = brStatus
			}
		}
	}

	// 恢复回显
	brRecoveries := br.Spec.Recoveries
	ogcRecoveries := newModifyGs.Spec.BackRecovery.Recoveries
	for _, recovery := range brRecoveries {
		brPodName := recovery.PodName
		brStatus := recovery.Status
		if brPodName == "" || brStatus == util.RecoveryStageStartDn {
			continue
		}
		for j, ogcRecovery := range ogcRecoveries {
			if ogcRecovery.PodName == brPodName && ogcRecoveries[j].Status != brStatus {
				ogcRecoveries[j].Status = brStatus
			}
		}
	}

	if !reflect.DeepEqual(nowGs, newModifyGs) {
		if err := r.K8sClient.UpdateOpenGaussClusterObject(newModifyGs, nowGs); err != nil {
			klog.Error(err)
		}
	}
	return nil
}

func (r *BackupRecovery) ReconcileBackupRecovery(gs *gsv1.OpenGaussCluster) error {

	// get cr br object
	namespacedName := types.NamespacedName{
		Namespace: gs.Namespace,
		Name:      gs.Name,
	}
	br, err := r.K8sClient.GetOpenGaussBackupRecovery(namespacedName)
	if err != nil {
		if errors.IsNotFound(err) {
			if isEmpty(gs.Spec.BackRecovery) {
				return nil
			}
			// create br
			newBr := r.newBackupRecoveryObject(gs)
			if err := r.K8sClient.Create(newBr); err != nil {
				return err
			}
			return nil
		}
		return err
	}

	// 定时备份
	if err := r.reconcileCronBackup(gs, br); err != nil {
		return err
	}

	// 手动备份
	if err := r.reconcileManualBackup(gs, br); err != nil {
		return err
	}

	// 手动恢复
	if err := r.reconcileManualRecovery(gs, br); err != nil {
		return err
	}

	return nil
}

func (r *BackupRecovery) reconcileManualRecovery(gs *gsv1.OpenGaussCluster, br *gsv1.OpenGaussBackupRecovery) error {
	ogcRecoveries := gs.Spec.BackRecovery.Recoveries
	nodeNum := int(*gs.Spec.DataNodeNum)
	modifyBr := br.DeepCopy()

	// 节点扩缩容适配
	if nodeNum < len(br.Spec.Recoveries) {
		modifyBr.Spec.Recoveries = br.Spec.Recoveries[:nodeNum]
	} else if nodeNum > len(br.Spec.Recoveries) {
		brLen := len(modifyBr.Spec.Recoveries)
		modifyBr.Spec.Recoveries = initRecoveriesData(gs)
		for i := 0; i < nodeNum; i++ {
			if i >= brLen {
				modifyBr.Spec.Recoveries[i].PodName = getDbPodName(gs.Name, i)
			} else {
				modifyBr.Spec.Recoveries[i] = br.Spec.Recoveries[i]
			}
		}
	}

	for i := 0; i < nodeNum; i++ {
		// 获取-i节点是恢复声明
		ogcRecoveryStatus := ""
		backupFile := ""
		recoveryExist := false
		for _, ogcRecovery := range ogcRecoveries {
			// 没有恢复声明，跳过
			if ogcRecovery.PodName == "" {
				continue
			}
			// 获取i节点信息
			ordinal, err := util.GetPodOrdinal(ogcRecovery.PodName)
			if err != nil || ordinal == -1 {
				klog.Error(fmt.Sprintf("%s get pod ordinal err", ogcRecovery.PodName), err)
				continue
			}
			if i == ordinal {
				if ogcRecovery.PodName != br.Name+"-"+strconv.Itoa(i) {
					klog.Error(fmt.Sprintf("%s recovery pod err", ogcRecovery.PodName))
					break
				}
				// 信息校对通过
				ogcRecoveryStatus = ogcRecovery.Status
				backupFile = ogcRecovery.BackFileName
				recoveryExist = true

			}
		}

		nowStatus := modifyBr.Spec.Recoveries[i].Status
		// 恢复完成 跳过
		if ogcRecoveryStatus == util.RecoveryStageSucceed || ogcRecoveryStatus == util.RecoveryStageFailed {
			continue
		}

		if ogcRecoveryStatus == util.RecoveryStageCloneDn {
			// 正在进行恢复任务
			if nowStatus == util.RecoveryStageCloneDn || nowStatus == util.RecoveryStageCreateJob ||
				nowStatus == util.RecoveryStageReadLog || nowStatus == util.RecoveryStageStartDn ||
				nowStatus == util.RecoveryStageJudgeDnStatus {
				continue
			}
			// 创建恢复任务
			modifyBr.Spec.Recoveries[i].Status = util.RecoveryStageCloneDn
			modifyBr.Spec.Recoveries[i].BackupFile = backupFile
		}

		// 删除恢复任务
		if !recoveryExist {
			if nowStatus != util.RecoveryStageStopJob && nowStatus != util.RecoveryStageStopJobEnd && nowStatus != "" {
				modifyBr.Spec.Recoveries[i].Status = util.RecoveryStageStopJob
				modifyBr.Spec.Recoveries[i].BackupFile = ""
			}
		}
	}

	if !reflect.DeepEqual(modifyBr, br) {
		if err := r.K8sClient.UpdateOpenGaussBackupRecoveryObject(modifyBr, br); err != nil {
			return err
		}
	}

	return nil
}

func (r *BackupRecovery) reconcileManualBackup(gs *gsv1.OpenGaussCluster, br *gsv1.OpenGaussBackupRecovery) error {
	ogcBackups := gs.Spec.BackRecovery.Backups
	nodeNum := int(*gs.Spec.DataNodeNum)

	modifyBr := br.DeepCopy()

	// 节点扩缩容适配
	if nodeNum < len(br.Spec.Backups) {
		modifyBr.Spec.Backups = br.Spec.Backups[:nodeNum]
	} else if nodeNum > len(br.Spec.Backups) {
		brLen := len(modifyBr.Spec.Backups)
		modifyBr.Spec.Backups = initBackupsData(gs)
		for i := 0; i < nodeNum; i++ {
			if i >= brLen {
				modifyBr.Spec.Backups[i].PodName = getDbPodName(gs.Name, i)
			} else {
				modifyBr.Spec.Backups[i] = br.Spec.Backups[i]
			}
		}
	}

	for i := 0; i < nodeNum; i++ {
		backupExist := false
		ogcStatus := ""
		for _, ogcBackup := range ogcBackups {
			podName := ogcBackup.PodName
			ordinal, err := util.GetPodOrdinal(podName)
			if err != nil || ordinal == -1 {
				klog.Error(fmt.Sprintf("%s get backup pod index err", podName))
				continue
			}
			if i == ordinal {
				if podName != br.Name+"-"+strconv.Itoa(i) {
					klog.Error(fmt.Sprintf("%s backup pod err", podName))
					break
				}
				backupExist = true
				ogcStatus = ogcBackup.Status
			}
		}
		nowStatus := modifyBr.Spec.Backups[i].Status
		// 备份完成 跳过
		if ogcStatus == util.BackupStageSucceed || ogcStatus == util.BackupStageFailed {
			continue
		}

		if ogcStatus == util.BackupStageCreate {
			// 正在进行备份任务
			if nowStatus == util.BackupStageCreate || nowStatus == util.BackupStageRunning {
				continue
			}
			// 创建备份任务
			modifyBr.Spec.Backups[i].Status = util.BackupStageCreate
		}
		// 删除备份任务
		if !backupExist {
			if nowStatus != "" && nowStatus != util.BackupStageStopJob && nowStatus != util.BackupStageStopJobEnd {
				modifyBr.Spec.Backups[i].Status = util.BackupStageStopJob
			}
		}
	}

	if !reflect.DeepEqual(modifyBr, br) {
		if err := r.K8sClient.UpdateOpenGaussBackupRecoveryObject(modifyBr, br); err != nil {
			return err
		}
	}

	return nil
}

func (r *BackupRecovery) newBackupRecoveryObject(gs *gsv1.OpenGaussCluster) *gsv1.OpenGaussBackupRecovery {
	//todo: 指定-0节点备份，后续建设动态调整从节点备份
	podName := gs.Name + util.TempBackupPodSuffix
	if gs.Spec.BackRecovery.CronBackupTime == "" {
		podName = ""
	}
	return &gsv1.OpenGaussBackupRecovery{
		ObjectMeta: metaV1.ObjectMeta{
			Name:            gs.Name,
			Namespace:       gs.Namespace,
			Labels:          gs.Labels,
			Annotations:     gs.Annotations,
			OwnerReferences: util.MakeOwnerRef(gs),
		},
		Spec: gsv1.OpenGaussBackupRecoverySpec{
			Transfer: gsv1.Transfer{
				Address: gs.Spec.BackRecovery.Transfer.Address,
				Mode:    gs.Spec.BackRecovery.Transfer.Mode,
			},
			CronBackup: gsv1.CronBackupData{
				Time:    gs.Spec.BackRecovery.CronBackupTime,
				PodName: podName,
			},
			Backups:    initBackupsData(gs),
			Recoveries: initRecoveriesData(gs),
			Image:      gs.Spec.Images.GaussBackupRecovery,
		},
	}
}

func initRecoveriesData(gs *gsv1.OpenGaussCluster) []gsv1.BrData {
	data := make([]gsv1.BrData, *gs.Spec.DataNodeNum)
	recoveries := gs.Spec.BackRecovery.Recoveries
	for i := range data {
		data[i].PodName = getDbPodName(gs.Name, i)
	}
	for _, recovery := range recoveries {
		ordinal, err := util.GetPodOrdinal(recovery.PodName)
		if err != nil || ordinal == -1 {
			continue
		}
		data[ordinal].Status = util.RecoveryStageCloneDn
		data[ordinal].BackupFile = recovery.BackFileName
	}
	return data
}

func initBackupsData(gs *gsv1.OpenGaussCluster) []gsv1.BrData {
	data := make([]gsv1.BrData, *gs.Spec.DataNodeNum)
	backups := gs.Spec.BackRecovery.Backups
	for i := range data {
		data[i].PodName = getDbPodName(gs.Name, i)
	}
	for _, backup := range backups {
		//ordinal, err := util.GetPodOrdinal(string(backup))
		ordinal, err := util.GetPodOrdinal(backup.PodName)
		if err != nil {
			continue
		}
		data[ordinal].Status = util.BackupStageCreate
	}
	return data
}

func (r *BackupRecovery) reconcileCronBackup(gs *gsv1.OpenGaussCluster, br *gsv1.OpenGaussBackupRecovery) error {

	//todo: 指定-0节点备份，后续建设动态调整从节点备份
	podName := gs.Name + util.TempBackupPodSuffix

	if equalsWithCronBackup(gs, br, podName) {
		return nil
	}

	// update cr about cron backup or transfer
	modifyBr := br.DeepCopy()
	modifyBr.Spec.Transfer = gs.Spec.BackRecovery.Transfer
	modifyBr.Spec.CronBackup = gsv1.CronBackupData{
		Time:    gs.Spec.BackRecovery.CronBackupTime,
		PodName: podName,
	}
	modifyBr.Spec.Image = gs.Spec.Images.GaussBackupRecovery
	if err := r.K8sClient.UpdateOpenGaussBackupRecoveryObject(modifyBr, br); err != nil {
		return err
	}

	return nil
}

func (r *BackupRecovery) ReconcileBrCronBackup(br *gsv1.OpenGaussBackupRecovery) error {

	cronBackup := br.Spec.CronBackup
	isExistCronJob := true

	cronJob, err := r.K8sClient.GetCronJob(getCronJobName(br.Name), br.Namespace)
	if err != nil {
		if errors.IsNotFound(err) {
			isExistCronJob = false
		} else {
			return err
		}
	}

	if cronBackup.Time == "" {
		// 定时备份已关闭,不做处理
		if !isExistCronJob {
			return nil
		}
		// 关闭定时备份
		if err := r.K8sClient.DeleteCronJob(cronJob); err != nil {
			return err
		}
	} else {
		// Create CronJob
		if !isExistCronJob {
			newCronBackup := r.newCronJob(br)
			if err := r.K8sClient.Create(newCronBackup); err != nil {
				return err
			}
			return nil
		}
		// Update CronJob
		if isUpdate, modifyCronJob := judgeCronJobMsg(br, cronJob); isUpdate {
			if err := r.K8sClient.PatchCronJob(modifyCronJob, cronJob); err != nil {
				return err
			}
		}
	}

	return nil
}

func (r *BackupRecovery) newCronJob(br *gsv1.OpenGaussBackupRecovery) *v1.CronJob {
	return &v1.CronJob{
		ObjectMeta: metaV1.ObjectMeta{
			Name:            getCronJobName(br.Name),
			Namespace:       br.Namespace,
			Labels:          br.Labels,
			Annotations:     br.Annotations,
			OwnerReferences: util.MakeBrOwnerRef(br),
		},
		Spec: v1.CronJobSpec{
			Schedule:    br.Spec.CronBackup.Time,
			JobTemplate: r.cronJobTemplate(br),
		},
	}
}

func (r *BackupRecovery) cronJobTemplate(br *gsv1.OpenGaussBackupRecovery) v1.JobTemplateSpec {
	ordinal, _ := util.GetPodOrdinal(br.Spec.CronBackup.PodName)
	backoffLimit := int32(util.BackoffLimit)
	return v1.JobTemplateSpec{
		Spec: v1.JobSpec{
			BackoffLimit: &backoffLimit,
			Template: coreV1.PodTemplateSpec{
				Spec: coreV1.PodSpec{
					Volumes: []coreV1.Volume{
						*SetVolumeGaussData(br.Name, ordinal),
					},
					Containers: []coreV1.Container{
						{
							Name:         util.CronBackupContainerName,
							Image:        br.Spec.Image,
							Env:          r.setBrEnv(br, br.Spec.CronBackup.PodName),
							Resources:    backupResources(),
							VolumeMounts: brVolumeMount(),
						},
					},
					Affinity:      brAffinity(br, br.Spec.CronBackup.PodName),
					RestartPolicy: util.BackupRestartPolicyNever,
				},
			},
		},
	}
}

func (r *BackupRecovery) setRecoveryEnv(br *gsv1.OpenGaussBackupRecovery, ordinal int) []coreV1.EnvVar {
	env := r.setBrEnv(br, getDbPodName(br.Name, ordinal))
	backupFileEnv := coreV1.EnvVar{
		Name:  util.EnvNameBackupFile,
		Value: br.Spec.Recoveries[ordinal].BackupFile,
	}
	env = append(env, backupFileEnv)
	return env
}

func (r *BackupRecovery) setBrEnv(br *gsv1.OpenGaussBackupRecovery, podName string) []coreV1.EnvVar {
	pod, err := r.K8sClient.GetPod(br.Namespace, podName)
	if err != nil {
		klog.Error(err)
	}
	ipv4 := ""
	ips := pod.Status.PodIPs
	for i := range ips {
		s := ips[i].IP
		if _, i2 := util.ParseIP(s); i2 == 4 {
			ipv4 = s
		}
	}
	return []coreV1.EnvVar{
		util.SetOneEnv(util.EnvNameAddress, br.Spec.Transfer.Address),
		util.SetOneEnv(util.EnvNameMode, br.Spec.Transfer.Mode),
		util.SetOneEnv("GS_PASSWORD", "Test@123"),
		util.SetOneEnv(util.EnvNameBrUsername, util.BrUsername),
		util.SetOneEnv(util.EnvNameBrPassword, util.BrPassword),
		util.SetOneEnv(util.EnvNameTransferUsername, util.TransferUsername),
		util.SetOneEnv(util.EnvNameTransferPassword, util.TransferPassword),
		util.SetOneEnv(util.EnvNameInstanceIp, ipv4),
	}
}

func brAffinity(br *gsv1.OpenGaussBackupRecovery, podName string) *coreV1.Affinity {
	return &coreV1.Affinity{
		PodAffinity: &coreV1.PodAffinity{
			RequiredDuringSchedulingIgnoredDuringExecution: []coreV1.PodAffinityTerm{
				{
					LabelSelector: &metaV1.LabelSelector{
						MatchExpressions: []metaV1.LabelSelectorRequirement{
							{
								Key: util.OgcNameKey,
								Values: []string{
									br.Name,
								},
								Operator: util.OperatorIn,
							},
							{
								Key: util.PodNameKey,
								Values: []string{
									podName,
								},
								Operator: util.OperatorIn,
							},
						},
					},
					Namespaces: []string{
						br.Namespace,
					},
					TopologyKey: util.HostnameKey,
				},
			},
		},
	}
}

func brVolumeMount() []coreV1.VolumeMount {
	return []coreV1.VolumeMount{
		{
			Name:      util.VolumeGaussData,
			MountPath: util.MountPathBrData,
		},
	}
}

func recoveryResources() coreV1.ResourceRequirements {
	return coreV1.ResourceRequirements{
		Limits: coreV1.ResourceList{
			"cpu":    resource.MustParse(util.RecoveryLimitCpu),
			"memory": resource.MustParse(util.RecoveryLimitMem),
		},
		Requests: coreV1.ResourceList{
			"cpu":    resource.MustParse(util.RecoveryRequestCpu),
			"memory": resource.MustParse(util.RecoveryRequestMem),
		},
	}
}

func backupResources() coreV1.ResourceRequirements {
	return coreV1.ResourceRequirements{
		Limits: coreV1.ResourceList{
			"cpu":    resource.MustParse(util.BackupLimitCpu),
			"memory": resource.MustParse(util.BackupLimitMem),
		},
		Requests: coreV1.ResourceList{
			"cpu":    resource.MustParse(util.BackupRequestCpu),
			"memory": resource.MustParse(util.BackupRequestMem),
		},
	}
}

func getCronJobName(name string) string {
	return fmt.Sprintf("%s-cron-backup", name)
}

func getManualBackupJobName(podName string) string {
	return fmt.Sprintf("%s-manual-backup", podName)
}
func getManualRecoveryJobName(podName string) string {
	return fmt.Sprintf("%s-manual-recovery", podName)
}

func judgeCronJobMsg(br *gsv1.OpenGaussBackupRecovery, cronJob *v1.CronJob) (bool, *v1.CronJob) {
	modifyCj := cronJob.DeepCopy()
	backupContainer := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0]
	envVars := backupContainer.Env
	transfer := br.Spec.Transfer

	// 镜像、时间
	isUpdate := br.Spec.CronBackup.Time != cronJob.Spec.Schedule ||
		br.Spec.Image != backupContainer.Image

	// 环境变量
	for index, envVar := range envVars {
		name := envVar.Name
		switch name {
		case util.EnvNameMode:
			if envVar.Value != transfer.Mode {
				isUpdate = true
				modifyCj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env[index] =
					util.SetOneEnv(util.EnvNameMode, transfer.Mode)
			}
		case util.EnvNameAddress:
			if envVar.Value != transfer.Address {
				isUpdate = true
				modifyCj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env[index] =
					util.SetOneEnv(util.EnvNameAddress, transfer.Address)
			}
		}
	}

	// update CronJob
	if isUpdate {
		modifyCj.Spec.Schedule = br.Spec.CronBackup.Time
		modifyCj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image = br.Spec.Image
	}
	return isUpdate, modifyCj
}

func (r *BackupRecovery) reconcileBrManualBackup(br *gsv1.OpenGaussBackupRecovery) error {
	//todo: 节点缩容后，不会对缩容节点的备份job进行删除
	brBackups := br.Spec.Backups
	modifyBr := br.DeepCopy()
	for i, brBackup := range brBackups {
		brStatus := brBackup.Status
		// 其他状态 跳过
		if brStatus == util.BackupStageSucceed || brStatus == util.BackupStageFailed || brStatus == util.BackupStageStopJobEnd {
			continue
		}

		// create 如果有job删除 如果没有job 进入下一阶段
		if brStatus == util.BackupStageCreate {
			backupJob, err := r.K8sClient.GetJob(brBackup.JobName, br.Namespace)
			if err != nil {
				if errors.IsNotFound(err) {
					modifyBr.Spec.Backups[i].Status = util.BackupStageRunning
					continue
				}
				return util.ReturnError("Find backup job error ", err)
			}
			// 创建时存在job，先删除
			if err := r.K8sClient.DeleteJob(backupJob); err != nil {
				return util.ReturnError("Delete backup job", err)
			}
		}

		// util.BrStageRunning 创建job 处理备份结果
		if brStatus == util.BackupStageRunning {

			job, err := r.K8sClient.GetJob(brBackup.JobName, br.Namespace)
			if err != nil {
				if errors.IsNotFound(err) {
					// 不存在 create job
					newBackup := r.newBackupJob(br, brBackup)
					modifyBr.Spec.Backups[i].JobName = getManualBackupJobName(brBackup.PodName)
					if err := r.K8sClient.Create(newBackup); err != nil {
						return util.ReturnError("Create backup job error ", err)
					}
					continue
				} else {
					return util.ReturnError("Get manual backup job error", err)
				}
			}

			status, err := r.dealManualBackupStatus(job)
			if err != nil {
				return err
			}
			modifyBr.Spec.Backups[i].Status = status
		}

		// "" 删除备份 job，去除其他字段
		if brStatus == util.BackupStageStopJob {
			job, err := r.K8sClient.GetJob(getManualBackupJobName(brBackup.PodName), br.Namespace)
			if err != nil {
				if errors.IsNotFound(err) {
					continue
				}
				return util.ReturnError("Get manual backup job error", err)
			}
			if err := r.K8sClient.DeleteJob(job); err != nil {
				return util.ReturnError("Delete backup job", err)
			}
			modifyBr.Spec.Backups[i].Status = util.BackupStageStopJobEnd
		}
	}

	if !reflect.DeepEqual(modifyBr, br) {
		if err := r.K8sClient.UpdateOpenGaussBackupRecoveryObject(modifyBr, br); err != nil {
			return err
		}
	}
	return nil
}

func (r *BackupRecovery) dealManualBackupStatus(job *v1.Job) (string, error) {
	succeeded := int(job.Status.Succeeded)
	failed := int(job.Status.Failed)

	if succeeded == 1 {
		backupPodList, err := r.K8sClient.GetPodList(job.Namespace, job.Spec.Selector.MatchLabels)
		if err != nil {
			return "", err
		}
		for _, pod := range backupPodList {
			podPhase := pod.Status.Phase
			if podPhase != util.JobPodCompletedStatus {
				continue
			}
			backupLog, err := r.K8sClient.GetPodLog(pod.Namespace, pod.Name)
			if err != nil {
				return "", err
			}
			if strings.Contains(backupLog, util.BackupSucceedLogSign) {
				return util.BackupStageSucceed, nil
			} else {
				return util.BackupStageFailed, nil
			}
		}

	}
	if failed == 1 {
		return util.BackupStageFailed, nil
	}
	return util.BackupStageRunning, nil
}

func (r *BackupRecovery) newRecoveryJob(br *gsv1.OpenGaussBackupRecovery, bd gsv1.BrData) *v1.Job {
	ordinal, _ := util.GetPodOrdinal(bd.PodName)
	backoffLimit := int32(util.BackoffLimit)
	ttl := int32(util.TTLSecondsAfterFinished)
	return &v1.Job{
		ObjectMeta: metaV1.ObjectMeta{
			Name:            getManualRecoveryJobName(bd.PodName),
			Namespace:       br.Namespace,
			Labels:          br.Labels,
			OwnerReferences: util.MakeBrOwnerRef(br),
		},
		Spec: v1.JobSpec{
			TTLSecondsAfterFinished: &ttl,
			BackoffLimit:            &backoffLimit,
			Template: coreV1.PodTemplateSpec{
				Spec: coreV1.PodSpec{
					Volumes: []coreV1.Volume{
						*SetVolumeGaussData(br.Name, ordinal),
					},
					Containers: []coreV1.Container{
						{
							Name:         util.ManualRecoveryContainerName,
							Image:        br.Spec.Image,
							Env:          r.setRecoveryEnv(br, ordinal),
							Resources:    recoveryResources(),
							VolumeMounts: brVolumeMount(),
						},
					},
					Affinity:      brAffinity(br, bd.PodName),
					RestartPolicy: util.BackupRestartPolicyNever,
				},
			},
		},
	}
}

func (r *BackupRecovery) newBackupJob(br *gsv1.OpenGaussBackupRecovery, bd gsv1.BrData) *v1.Job {
	ttl := int32(util.TTLSecondsAfterFinished)
	ordinal, _ := util.GetPodOrdinal(bd.PodName)
	backoffLimit := int32(util.BackoffLimit)
	return &v1.Job{
		ObjectMeta: metaV1.ObjectMeta{
			Name:            getManualBackupJobName(bd.PodName),
			Namespace:       br.Namespace,
			Labels:          br.Labels,
			OwnerReferences: util.MakeBrOwnerRef(br),
		},
		Spec: v1.JobSpec{
			TTLSecondsAfterFinished: &ttl,
			BackoffLimit:            &backoffLimit,
			Template: coreV1.PodTemplateSpec{
				Spec: coreV1.PodSpec{
					Volumes: []coreV1.Volume{
						*SetVolumeGaussData(br.Name, ordinal),
					},
					Containers: []coreV1.Container{
						{
							Name:         util.ManualBackupContainerName,
							Image:        br.Spec.Image,
							Env:          r.setBrEnv(br, bd.PodName),
							Resources:    backupResources(),
							VolumeMounts: brVolumeMount(),
						},
					},
					Affinity:      brAffinity(br, bd.PodName),
					RestartPolicy: util.BackupRestartPolicyNever,
				},
			},
		},
	}
}

func (r *BackupRecovery) reconcileBrManualRecovery(br *gsv1.OpenGaussBackupRecovery) error {
	//todo: 节点缩容后，不会对缩容节点的恢复job进行删除
	brRecoveries := br.Spec.Recoveries
	modifyBr := br.DeepCopy()
	for i, brRecovery := range brRecoveries {
		brStatus := brRecovery.Status
		// 其他状态 跳过
		if brStatus == util.RecoveryStageSucceed || brStatus == util.RecoveryStageFailed ||
			brStatus == util.RecoveryStageStopJobEnd {
			continue
		}

		// 第一步 停止DN
		if brStatus == util.RecoveryStageCloneDn {
			// 获取数据节点nodeId
			nodeId, err := r.K8sClient.ExecCommand(br.Namespace, util.ReplaceData(util.Command(util.CommandGetDataNodeId), brRecovery.PodName), brRecovery.PodName)
			if err != nil {
				return util.ReturnError(fmt.Sprintf("Failed to get id in the %s stage", util.RecoveryStageCloneDn), err)
			}
			nodeId = strings.Replace(nodeId, "\n", "", -1)

			// 停止数据节点
			_, err = r.K8sClient.ExecCommand(br.Namespace, util.ReplaceData(util.Command(util.CommandStopDn), nodeId), brRecovery.PodName)
			if err != nil {
				return util.ReturnError(fmt.Sprintf("Failed to stop dn in the %s stage", util.RecoveryStageCloneDn), err)
			}

			modifyBr.Spec.Recoveries[i].Status = util.RecoveryStageCreateJob
		}

		// 第二步 删除之前的恢复job
		if brStatus == util.RecoveryStageCreateJob {
			job, err := r.K8sClient.GetJob(brRecovery.JobName, br.Namespace)
			if err != nil {
				if errors.IsNotFound(err) {
					modifyBr.Spec.Recoveries[i].Status = util.RecoveryStageReadLog
					continue
				}
				return util.ReturnError(fmt.Sprintf("Get recovery job failed in the %s stage", util.RecoveryStageCreateJob), err)
			} else {
				// 存在job需要先删除
				if err := r.K8sClient.DeleteJob(job); err != nil {
					return util.ReturnError(fmt.Sprintf("Delete recovery job failed in the %s stage", util.RecoveryStageCreateJob), err)
				}
				continue
			}
		}

		// 第三步 创建恢复任务 检查恢复日志
		if brStatus == util.RecoveryStageReadLog {
			job, err := r.K8sClient.GetJob(brRecovery.JobName, br.Namespace)
			if err != nil {
				if errors.IsNotFound(err) {
					// 不存在 create job
					newRecovery := r.newRecoveryJob(br, brRecovery)
					modifyBr.Spec.Recoveries[i].JobName = getManualRecoveryJobName(brRecovery.PodName)
					if err := r.K8sClient.Create(newRecovery); err != nil {
						return util.ReturnError(fmt.Sprintf("Create recovery job failed in the %s stage", util.RecoveryStageCreateJob), err)
					}
					continue
				} else {
					return util.ReturnError(fmt.Sprintf("Get recovery job failed in the %s stage", util.RecoveryStageReadLog), err)
				}
			}

			succeeded := int(job.Status.Succeeded)
			failed := int(job.Status.Failed)
			if succeeded == 1 {
				recoveryPodList, err := r.K8sClient.GetPodList(job.Namespace, job.Spec.Selector.MatchLabels)
				if err != nil {
					modifyBr.Spec.Recoveries[i].Status = util.RecoveryStageFailed
					klog.Error(fmt.Sprintf("%s Get recovery pod list failed in the %s stage", brRecovery.PodName, util.RecoveryStageReadLog))
					continue
				}
				for _, pod := range recoveryPodList {
					podPhase := pod.Status.Phase
					if podPhase != util.JobPodCompletedStatus {
						continue
					}
					backupLog, err := r.K8sClient.GetPodLog(pod.Namespace, pod.Name)
					if err != nil {
						klog.Error(fmt.Sprintf("%s Get recovery pod log failed in the %s stage", brRecovery.PodName, util.RecoveryStageReadLog))
						continue
					}
					if strings.Contains(backupLog, util.RecoverySucceedLogSign) {
						modifyBr.Spec.Recoveries[i].Status = util.RecoveryStageStartDn
					} else {
						klog.Error(fmt.Sprintf("%s Read recovery log failed in the %s stage", brRecovery.PodName, util.RecoveryStageReadLog))
						modifyBr.Spec.Recoveries[i].Status = util.RecoveryStageFailed
					}
				}
			}
			if failed == 1 {
				klog.Error(fmt.Printf("%s recovery job failed", brRecovery.PodName))
				modifyBr.Spec.Recoveries[i].Status = util.RecoveryStageFailed
			}
		}

		// 第四步 启动DN
		if brStatus == util.RecoveryStageStartDn {
			// 获取数据节点nodeId
			nodeId, err := r.K8sClient.ExecCommand(br.Namespace, util.ReplaceData(util.Command(util.CommandGetDataNodeId), brRecovery.PodName), brRecovery.PodName)
			if err != nil {
				return util.ReturnError(fmt.Sprintf("Get node id failed in the %s stage", util.RecoveryStageStartDn), err)
			}
			nodeId = strings.Replace(nodeId, "\n", "", -1)

			// 启动数据节点
			go func() {
				_, err = r.K8sClient.ExecCommand(br.Namespace, util.ReplaceData(util.Command(util.CommandStartDn), nodeId), brRecovery.PodName)
				if err != nil {
					klog.Error(fmt.Sprintf("%s Failed to start dn in the %s stage", brRecovery.PodName, util.RecoveryStageStartDn), err)
				}
			}()

			modifyBr.Spec.Recoveries[i].Status = util.RecoveryStageJudgeDnStatus
		}

		// 第五步 判断dn是否正常启动
		if brStatus == util.RecoveryStageJudgeDnStatus {
			commandGetDnStatus := util.ReplaceData(util.Command(util.CommandGetInstanceStatus), brRecovery.PodName)
			executeInstanceState, err := r.K8sClient.ExecCommand(br.Namespace, commandGetDnStatus, brRecovery.PodName)
			if err != nil || strings.Replace(executeInstanceState, "\n", "", -1) != util.DnStateNormal {
				klog.Error(fmt.Sprintf("%s dn status error in the %s stage", brRecovery.PodName, util.RecoveryStageJudgeDnStatus), err)
			} else {
				modifyBr.Spec.Recoveries[i].Status = util.RecoveryStageSucceed
			}
		}

		// 停止恢复 job
		if brStatus == util.RecoveryStageStopJob {
			job, err := r.K8sClient.GetJob(getManualRecoveryJobName(brRecovery.PodName), br.Namespace)
			if err != nil && !errors.IsNotFound(err) {
				return util.ReturnError(fmt.Sprintf("Get recovery job in the %s stage", util.RecoveryStageStopJob), err)
			}
			if !errors.IsNotFound(err) {
				if err := r.K8sClient.DeleteJob(job); err != nil {
					return util.ReturnError(fmt.Sprintf("Delete recovery job in the %s stage", util.RecoveryStageStopJob), err)
				}
			}
			modifyBr.Spec.Recoveries[i].Status = util.RecoveryStageStopJobEnd
		}

	}
	if !reflect.DeepEqual(modifyBr, br) {
		if err := r.K8sClient.UpdateOpenGaussBackupRecoveryObject(modifyBr, br); err != nil {
			return err
		}
	}

	return nil
}

func equalsWithCronBackup(gs *gsv1.OpenGaussCluster, br *gsv1.OpenGaussBackupRecovery, podName string) bool {
	backRecovery := gs.Spec.BackRecovery
	brSpec := br.Spec
	if !equalsTransfer(backRecovery.Transfer, brSpec.Transfer) ||
		backRecovery.CronBackupTime != brSpec.CronBackup.Time ||
		brSpec.CronBackup.PodName != podName ||
		brSpec.Image != gs.Spec.Images.GaussBackupRecovery {
		return false
	}
	return true
}

func equalsTransfer(gsTransfer gsv1.Transfer, brTransfer gsv1.Transfer) bool {
	if gsTransfer.Address != brTransfer.Address || gsTransfer.Mode != brTransfer.Mode {
		return false
	}
	return true
}

func isEmpty(backRecovery gsv1.BackRecoverySpec) bool {
	return reflect.DeepEqual(backRecovery, gsv1.BackRecoverySpec{})
}
